package com.flink.demo.cep11;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

/**
 * description
 *
 * @author zsyoung@qq.com
 * 2020/7/24 23:16
 */
public class CEPDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1、构建输入源
        DataStreamSource source = env.fromElements(
                //浏览记录
                Tuple3.of("Marry", "外套", 1L),
                Tuple3.of("Marry", "帽子", 1L),
                Tuple3.of("Marry", "帽子", 2L),
                Tuple3.of("Marry", "帽子", 3L),
                Tuple3.of("Marry", "帽子", 6L),
                Tuple3.of("Ming", "衣服", 1L),
                Tuple3.of("Marry", "鞋子", 1L),
                Tuple3.of("Marry", "鞋子", 2L),
                Tuple3.of("LiLei", "帽子", 1L),
                Tuple3.of("LiLei", "帽子", 2L),
                Tuple3.of("LiLei", "帽子", 3L)
        );


        //2、定义Pattern,寻找连续搜索帽子的用户
        Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern.<Tuple3<String, String, Long>>begin("start")
                .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                        return "帽子".equals(value.f1);
                    }
                })
//                .timesOrMore(3);
                //这里的test是随意命名的，要和下方的命名对应
                .next("test")
                .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                        return "帽子".equals(value.f1);
                    }
                });

        //3、进行匹配
        KeyedStream keyedStream = source.keyBy(0);
        PatternStream patternStream = CEP.pattern(keyedStream, pattern);
        SingleOutputStreamOperator matchStream = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {

            @Override
            public String select(Map<String, List<Tuple3<String, String, Long>>> map) throws Exception {

//                return map.toString();
                List<Tuple3<String, String, Long>> middle = map.get("test");
                return middle.get(0).f0 + ":" + middle.get(0).f2 + ":" + "连续搜索两次帽子！";
            }
        });

        //4、输出匹配的信息
        matchStream.printToErr();
        env.execute("execute cep");
    }
}
